package com.example.mq.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * Kafka消费者
 * Created by liulanhua on 2018/8/31.
 */
@Component
public class KafkaConsumer {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    /*@KafkaListener(topics = {"${spring.kafka.template.default-topic}"},
            containerFactory = "kafkaListenerContainerFactory")*/
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            logger.info("接收消息: offset = {}, key = {}, value = {} ",
                    record.offset(), record.key(), record.value());
        } catch (Exception e) {
            logger.error("kafka接收消息异常",e);
        } finally {
            //手动提交偏移量
            ack.acknowledge();
        }
    }


}
